/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * license agreements; and to You under the Apache License, version 2.0:
 *
 *   https://www.apache.org/licenses/LICENSE-2.0
 *
 * This file is part of the Apache Pekko project, which was derived from Akka.
 */

/*
 * Copyright (C) 2018-2021 Lightbend Inc. <https://www.lightbend.com>
 */

package org.apache.pekko.grpc.interop;

import java.util.concurrent.CompletionStage;
import java.util.concurrent.CompletableFuture;

import org.apache.pekko.NotUsed;
import org.apache.pekko.grpc.GrpcServiceException;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.javadsl.Flow;
import org.apache.pekko.stream.javadsl.Source;
import com.google.protobuf.ByteString;

// Generated by our plugin
import io.grpc.Status;
import io.grpc.testing.integration.EmptyProtos;
import io.grpc.testing.integration.Messages.*;
import io.grpc.testing.integration.TestService;

/**
 * Implementation of the generated service.
 *
 * Essentially porting the client code from [[io.grpc.testing.integration.TestServiceImpl]] against our API's
 *
 * The same implementation is also be found as part of the 'scripted' tests at
 * /sbt-plugin/src/sbt-test/gen-scala-server/00-interop/src/main/java/org/apache/pekko/grpc/JavaTestServiceImpl.scala
 */
public class JavaTestServiceImpl implements TestService {
  private final Materializer mat;

  private static final Flow<ResponseParameters, StreamingOutputCallResponse, NotUsed> parametersToResponseFlow =
    Flow.<ResponseParameters>create()
      .map(parameters ->
      StreamingOutputCallResponse.newBuilder()
        .setPayload(
          Payload.newBuilder()
            .setBody(ByteString.copyFrom(new byte[parameters.getSize()]))
            .build()
        )
        .build()
    );

  public JavaTestServiceImpl(Materializer mat) {
    this.mat = mat;
  }

  @Override
  public CompletionStage<EmptyProtos.Empty> emptyCall(EmptyProtos.Empty in) {
    return CompletableFuture.completedFuture(EmptyProtos.Empty.newBuilder().build());
  }

    @Override
    public CompletionStage<SimpleResponse> unaryCall(SimpleRequest in) {
        if (in.hasResponseStatus()) {
            EchoStatus requestStatus = in.getResponseStatus();
            Status status = Status.fromCodeValue(requestStatus.getCode()).withDescription(requestStatus.getMessage());
            CompletableFuture<SimpleResponse> cf = new CompletableFuture<>();
            cf.completeExceptionally(new GrpcServiceException(status));
            return cf;
        } else {
            return CompletableFuture.completedFuture(
                    SimpleResponse.newBuilder()
                            .setPayload(Payload.newBuilder()
                                    .setBody(ByteString.copyFrom(new byte[in.getResponseSize()]))
                                    .build())
                            .build()
            );
        }
    }

  @Override
  public CompletionStage<SimpleResponse> cacheableUnaryCall(SimpleRequest in) {
    return unaryCall(in);
  }

  @Override
  public Source<StreamingOutputCallResponse, NotUsed> streamingOutputCall(StreamingOutputCallRequest in) {
    return Source.from(in.getResponseParametersList())
      .via(parametersToResponseFlow)
      .mapMaterializedValue(x -> x);
  }

  @Override
  public CompletionStage<StreamingInputCallResponse> streamingInputCall(Source<StreamingInputCallRequest, NotUsed> in) {
    return in
      .map(i -> i.getPayload().getBody().size())
      .runFold(0, (Integer x, Integer y)->x+y, mat)
      .thenApply(sum -> StreamingInputCallResponse.newBuilder().setAggregatedPayloadSize(sum).build());
  }


  @Override
  public Source<StreamingOutputCallResponse, NotUsed> fullDuplexCall(Source<StreamingOutputCallRequest, NotUsed> in) {
    return in
            .map(req -> {
                if(req.hasResponseStatus()) {
                    throw new GrpcServiceException(
                            Status
                                    .fromCodeValue(req.getResponseStatus().getCode())
                                    .withDescription(req.getResponseStatus().getMessage())
                    );
                }else {
                    return req;
                }
            })
      .mapConcat(r -> r.getResponseParametersList())
      .via(parametersToResponseFlow);
  }


  @Override
  public Source<StreamingOutputCallResponse, NotUsed> halfDuplexCall(Source<StreamingOutputCallRequest, NotUsed> in) {
    return null;
  }

  @Override
  public CompletionStage<EmptyProtos.Empty> unimplementedCall(EmptyProtos.Empty in) {
     throw new UnsupportedOperationException();
  }

}
